import 'dart:async';

import 'package:async/async.dart';
import 'package:async_test/src/utils.dart';

import '../common/test_page.dart';

class StreamGroupTestPage extends TestPage {
  StreamGroupTestPage(super.title) {
    group('StreamGroup<String>()单一订阅', () {
      late StreamGroup<String> streamGroup;
      setUp() {
        streamGroup = StreamGroup<String>();
      }

      test(
          'StreamGroup<String>().add(), StreamGroup<String>().close(), StreamGroup<String>().stream.toList()  缓冲来自多个源的事件',
          () async {
        setUp();
        var controller1 = StreamController<String>();
        streamGroup.add(controller1.stream);
        controller1.add('first');
        controller1.close();

        var controller2 = StreamController<String>();
        streamGroup.add(controller2.stream);
        controller2.add('second');
        controller2.close();

        await flushMicrotasks();

        expect(streamGroup.close());

        expect(streamGroup.stream.toList());
      });

      test('StreamGroup<String>().stream.transform() 缓冲来自多个源的错误', () async {
        setUp();
        var controller1 = StreamController<String>();
        streamGroup.add(controller1.stream);
        controller1.addError('first');
        controller1.close();

        var controller2 = StreamController<String>();
        streamGroup.add(controller2.stream);
        controller2.addError('second');
        controller2.close();

        await flushMicrotasks();

        expect(streamGroup.close());

        var transformed = streamGroup.stream.transform(
            StreamTransformer<String, String>.fromHandlers(
                handleError: (error, _, sink) => sink.add('error: $error')));
        expect(transformed.toList());
      });

      test('将事件和错误缓冲在一起', () async {
        setUp();
        var controller = StreamController<String>();
        streamGroup.add(controller.stream);

        controller.add('first');
        controller.addError('second');
        controller.add('third');
        controller.addError('fourth');
        controller.addError('fifth');
        controller.add('sixth');
        controller.close();

        await flushMicrotasks();

        expect(streamGroup.close());

        var transformed = streamGroup.stream.transform(
            StreamTransformer<String, String>.fromHandlers(
                handleData: (data, sink) => sink.add('data: $data'),
                handleError: (error, _, sink) => sink.add('error: $error')));
        expect(transformed.toList());
      });

      test("一旦有侦听器就发出事件", () {
        setUp();
        var controller = StreamController<String>();
        streamGroup.add(controller.stream);

        expect(streamGroup.stream.toList());

        controller.add('first');
        controller.add('second');
        controller.close();

        expect(streamGroup.close());
      });

      test("不缓冲广播流中的事件", () async {
        setUp();
        var controller = StreamController<String>.broadcast();
        streamGroup.add(controller.stream);

        controller.add('first');
        controller.add('second');
        controller.close();

        await flushMicrotasks();

        expect(streamGroup.close());
        expect(streamGroup.stream.toList());
      });

      test('暂停时，缓冲广播流中的事件', () async {
        setUp();
        var controller = StreamController<String>.broadcast();
        streamGroup.add(controller.stream);

        var events = [];
        var subscription = streamGroup.stream.listen(events.add);
        subscription.pause();

        controller.add('first');
        controller.add('second');
        controller.close();
        await flushMicrotasks();

        subscription.resume();
        expect(streamGroup.close());
        await flushMicrotasks();

        expect(events);
      });

      test("一旦有侦听器，就从广播流中发出事件", () {
        setUp();
        var controller = StreamController<String>.broadcast();
        streamGroup.add(controller.stream);

        expect(streamGroup.stream.toList());

        controller.add('first');
        controller.add('second');
        controller.close();

        expect(streamGroup.close());
      });

      test('转发取消错误', () async {
        setUp();
        var subscription = streamGroup.stream.listen(null);

        var controller =
            StreamController<String>(onCancel: () => throw 'error');
        streamGroup.add(controller.stream);
        await flushMicrotasks();

        expect(subscription.cancel());
      });

      test('转发取消未来', () async {
        setUp();
        var subscription = streamGroup.stream.listen(null);

        var completer = Completer();
        var controller =
            StreamController<String>(onCancel: () => completer.future);
        streamGroup.add(controller.stream);
        await flushMicrotasks();

        var fired = false;
        subscription.cancel().then((_) => fired = true);

        await flushMicrotasks();
        expect(fired);

        completer.complete();
        await flushMicrotasks();
        expect(fired);
      });

      test('add()活动时，如果组暂停，则暂停流，然后在组恢复后恢复', () async {
        setUp();
        var subscription = streamGroup.stream.listen(null);
        await flushMicrotasks();

        var paused = false;
        var controller = StreamController<String>(
            onPause: () => paused = true, onResume: () => paused = false);

        subscription.pause();
        await flushMicrotasks();

        streamGroup.add(controller.stream);
        await flushMicrotasks();
        expect(paused);

        subscription.resume();
        await flushMicrotasks();
        expect(paused);
      });

      setUp1() async {
        streamGroup.stream.listen(null).cancel();
        await flushMicrotasks();
      }

      test('取消时add()立即收听并取消流', () async {
        setUp();
        setUp1();
        var listened = false;
        var canceled = false;
        var controller = StreamController<String>(onListen: () {
          listened = true;
        }, onCancel: () {
          expect(listened);
          canceled = true;
        });

        streamGroup.add(controller.stream);
        await flushMicrotasks();
        expect(listened);
        expect(canceled);
      });

      test('取消时add()转发取消错误', () {
        setUp();
        setUp1();
        var controller =
            StreamController<String>(onCancel: () => throw 'error');

        expect(streamGroup.add(controller.stream));
      });

      test('取消时add()转发取消未来', () async {
        setUp();
        setUp1();
        var completer = Completer();
        var controller =
            StreamController<String>(onCancel: () => completer.future);

        var fired = false;
        streamGroup.add(controller.stream)!.then((_) => fired = true);

        await flushMicrotasks();
        expect(fired);

        completer.complete();
        await flushMicrotasks();
        expect(fired);
      });

      late Stream<String> alreadyListened;
      setUp2() {
        alreadyListened = Stream.value('foo')..listen(null);
      }

      test('当listen()抛出错误时listen()重新思考该错误', () {
        setUp();
        setUp2();
        streamGroup.add(alreadyListened);

        runZonedGuarded(() => streamGroup.stream.listen((_) {}),
            (error, _) => expect(error));
      });

      test('当listen()抛出错误时listen()取消其他订阅', () async {
        setUp();
        setUp2();
        var firstCancelled = false;
        var first =
            StreamController<String>(onCancel: () => firstCancelled = true);
        streamGroup.add(first.stream);

        streamGroup.add(alreadyListened);

        var lastCancelled = false;
        var last =
            StreamController<String>(onCancel: () => lastCancelled = true);
        streamGroup.add(last.stream);

        runZonedGuarded(() => streamGroup.stream.listen(null), (_, __) {});

        expect(firstCancelled);
        expect(lastCancelled);
      });

      test('当listen()抛出错误时取消订阅是不可行的 同步地', () {
        setUp();
        setUp2();
        streamGroup.add(alreadyListened);

        var subscription =
            runZonedGuarded(() => streamGroup.stream.listen(null), (_, __) {});

        expect(subscription!.cancel());
      });

      test('当listen()抛出错误时取消订阅是不可行的 异步地', () async {
        setUp();
        setUp2();
        streamGroup.add(alreadyListened);

        var subscription =
            runZonedGuarded(() => streamGroup.stream.listen(null), (_, __) {});

        await pumpEventQueue();
        expect(subscription!.cancel());
      });
    });

    group('StreamGroup<String>.broadcast()', () {
      late StreamGroup<String> streamGroup;
      setUp() {
        streamGroup = StreamGroup<String>.broadcast();
      }

      test(
          'StreamGroup<String>.broadcast().add(),StreamGroup<String>.broadcast().close(),StreamGroup<String>.broadcast().stream.toList() 缓冲来自多个源的事件',
          () async {
        setUp();
        var controller1 = StreamController<String>();
        streamGroup.add(controller1.stream);
        controller1.add('first');
        controller1.close();

        var controller2 = StreamController<String>();
        streamGroup.add(controller2.stream);
        controller2.add('second');
        controller2.close();

        await flushMicrotasks();

        expect(streamGroup.close());

        expect(streamGroup.stream.toList());
      });

      test("一旦有侦听器，就从多个源发出事件", () {
        setUp();
        var controller1 = StreamController<String>();
        streamGroup.add(controller1.stream);

        var controller2 = StreamController<String>();
        streamGroup.add(controller2.stream);

        expect(streamGroup.stream.toList());

        controller1.add('first');
        controller2.add('second');
        controller1.close();
        controller2.close();

        expect(streamGroup.close());
      });

      test("添加和删除侦听器后不缓冲事件", () async {
        setUp();
        var controller = StreamController<String>();
        streamGroup.add(controller.stream);

        streamGroup.stream.listen(null).cancel();
        await flushMicrotasks();

        controller.add('first');
        controller.addError('second');
        controller.close();

        await flushMicrotasks();

        expect(streamGroup.close());
        expect(streamGroup.stream.toList());
      });

      test("不缓冲广播流中的事件", () async {
        setUp();
        var controller = StreamController<String>.broadcast();
        streamGroup.add(controller.stream);
        controller.add('first');
        controller.addError('second');
        controller.close();

        await flushMicrotasks();

        expect(streamGroup.close());
        expect(streamGroup.stream.toList());
      });

      test("一旦有侦听器，就从广播流中发出事件", () {
        setUp();
        var controller = StreamController<String>.broadcast();
        streamGroup.add(controller.stream);

        expect(streamGroup.stream.toList());

        controller.add('first');
        controller.add('second');
        controller.close();

        expect(streamGroup.close());
      });

      test('取消并重新收听广播流', () async {
        setUp();
        var subscription = streamGroup.stream.listen(null);

        var controller = StreamController<String>.broadcast();

        streamGroup.add(controller.stream);
        await flushMicrotasks();
        expect(controller.hasListener);

        subscription.cancel();
        await flushMicrotasks();
        expect(controller.hasListener);

        streamGroup.stream.listen(null);
        await flushMicrotasks();
        expect(controller.hasListener);
      });

      test('在取消后重新侦听单个订阅流之后的流', () async {
        setUp();
        var controller1 = StreamController<String>();
        streamGroup.add(controller1.stream);
        streamGroup.stream.listen(null).cancel();

        var controller2 = StreamController<String>();
        streamGroup.add(controller2.stream);

        var emitted = <String>[];
        streamGroup.stream.listen(emitted.add);
        controller1.add('one');
        controller2.add('two');
        await flushMicrotasks();
        expect(emitted);
      });

      test('从不取消单个订阅流', () async {
        setUp();
        var subscription = streamGroup.stream.listen(null);

        var controller = StreamController<String>(onCancel: () {});

        streamGroup.add(controller.stream);
        await flushMicrotasks();

        subscription.cancel();
        await flushMicrotasks();

        streamGroup.stream.listen(null);
        await flushMicrotasks();
      });

      test('在休眠时从单个订阅流中删除事件', () async {
        setUp();
        var events = [];
        var subscription = streamGroup.stream.listen(events.add);

        var controller = StreamController<String>();
        streamGroup.add(controller.stream);
        await flushMicrotasks();

        controller.add('first');
        await flushMicrotasks();
        expect(events);

        subscription.cancel();
        controller.add('second');
        await flushMicrotasks();
        expect(events);

        streamGroup.stream.listen(events.add);
        controller.add('third');
        await flushMicrotasks();
        expect(events);
      });

      test('单个订阅流可以在休眠时删除', () async {
        setUp();
        var controller = StreamController<String>();
        streamGroup.add(controller.stream);
        await flushMicrotasks();

        streamGroup.stream.listen(null).cancel();
        await flushMicrotasks();

        streamGroup.remove(controller.stream);
        expect(controller.hasListener);
        await flushMicrotasks();

        expect(streamGroup.stream.toList());
        controller.add('first');
        expect(streamGroup.close());
      });
    });

    regardlessOfType(StreamGroup<String>.new);

    regardlessOfType(StreamGroup<String>.broadcast);

    test('StreamGroup.merge()从所有组件流中发出事件', () async {
      var controller1 = StreamController<String>();
      var controller2 = StreamController<String>();

      var merged = StreamGroup.merge([controller1.stream, controller2.stream]);

      controller1.add('first');
      controller1.close();
      controller2.add('second');
      controller2.close();

      expect(await merged.toList());
    });

    test('StreamGroup.mergeBroadcast() 从所有组件流中发出事件', () async {
      var controller1 = StreamController<String>();
      var controller2 = StreamController<String>();

      var merged = StreamGroup.mergeBroadcast([controller1.stream, controller2.stream]);

      controller1.add('first');
      controller1.close();
      controller2.add('second');
      controller2.close();

      expect(merged.isBroadcast);

      expect(await merged.toList());
    });
  }

  void regardlessOfType(StreamGroup<String> Function() newStreamGroup) {
    late StreamGroup<String> streamGroup;
    setUp() {
      streamGroup = newStreamGroup();
    }

    group('${newStreamGroup.toString()}.add()', () {
      test("休眠时,在收听组之前不收听流", () async {
        setUp();
        var controller = StreamController<String>();

        expect(streamGroup.add(controller.stream));
        await flushMicrotasks();
        expect(controller.hasListener);

        streamGroup.stream.listen(null);
        await flushMicrotasks();
        expect(controller.hasListener);
      });

      test('休眠时,如果流已经在组中，则为no-op', () {
        setUp();
        var controller = StreamController<String>();
        streamGroup.add(controller.stream);
        streamGroup.add(controller.stream);
        streamGroup.add(controller.stream);
        expect(controller.hasListener);

        streamGroup.stream.listen(null);
        expect(controller.hasListener);
      });

      setUp1() async {
        streamGroup.stream.listen(null);
        await flushMicrotasks();
      }

      test('活动时, 立即收听流', () async {
        setUp();
        setUp1();
        var controller = StreamController<String>();

        expect(streamGroup.add(controller.stream));
        await flushMicrotasks();
        expect(controller.hasListener);
      });

      test('活动时, 如果流已经在组中，则为no-op', () async {
        setUp();
        setUp1();
        var controller = StreamController<String>();

        streamGroup.add(controller.stream);
        streamGroup.add(controller.stream);
        streamGroup.add(controller.stream);
        expect(controller.hasListener);
      });
    });

    group('${newStreamGroup.toString()}.remove()', () {
      test("休眠时, 停止为已删除的流发出事件", () async {
        setUp();
        var controller = StreamController<String>();
        streamGroup.add(controller.stream);

        expect(streamGroup.stream.toList());

        controller.add('first');
        await flushMicrotasks();
        controller.add('second');

        expect(streamGroup.remove(controller.stream));
        expect(streamGroup.close());
      });

      test('休眠时, 是未知流的非操作', () {
        setUp();
        var controller = StreamController<String>();
        expect(streamGroup.remove(controller.stream));
      });

      test('休眠时, 并在移除最后一个流时关闭组', () async {
        setUp();
        var controller1 = StreamController<String>();
        var controller2 = StreamController<String>();

        streamGroup.add(controller1.stream);
        streamGroup.add(controller2.stream);
        await flushMicrotasks();

        expect(streamGroup.isClosed);
        streamGroup.close();
        expect(streamGroup.isClosed);

        streamGroup.remove(controller1.stream);
        await flushMicrotasks();

        streamGroup.remove(controller2.stream);
        await flushMicrotasks();

        expect(streamGroup.stream.toList());
      });

      test("监听时, 不从已删除的流中发出事件", () {
        setUp();
        var controller = StreamController<String>();
        streamGroup.add(controller.stream);

        expect(streamGroup.stream.toList());

        controller.add('first');
        expect(streamGroup.remove(controller.stream));
        controller.add('second');

        expect(streamGroup.close());
      });

      test("监听时, 取消流的订阅", () async {
        setUp();
        var controller = StreamController<String>();
        streamGroup.add(controller.stream);

        streamGroup.stream.listen(null);
        await flushMicrotasks();
        expect(controller.hasListener);

        streamGroup.remove(controller.stream);
        await flushMicrotasks();
        expect(controller.hasListener);
      });

      test('监听时, 转发取消错误', () async {
        setUp();
        var controller = StreamController<String>(onCancel: () => throw 'error');
        streamGroup.add(controller.stream);

        streamGroup.stream.listen(null);
        await flushMicrotasks();

        expect(streamGroup.remove(controller.stream));
      });

      test('监听时, 远期取消期货', () async {
        setUp();
        var completer = Completer();
        var controller = StreamController<String>(onCancel: () => completer.future);

        streamGroup.stream.listen(null);
        await flushMicrotasks();

        streamGroup.add(controller.stream);
        await flushMicrotasks();

        var fired = false;
        streamGroup.remove(controller.stream)!.then((_) => fired = true);

        await flushMicrotasks();
        expect(fired);

        completer.complete();
        await flushMicrotasks();
        expect(fired);
      });

      test('监听时, 是未知流的非操作', () async {
        setUp();
        var controller = StreamController<String>();
        streamGroup.stream.listen(null);
        await flushMicrotasks();

        expect(streamGroup.remove(controller.stream));
      });

      test('监听时, 并在移除最后一个流时关闭组', () async {
        setUp();
        var done = false;
        streamGroup.stream.listen(null, onDone: () => done = true);
        await flushMicrotasks();

        var controller1 = StreamController<String>();
        var controller2 = StreamController<String>();

        streamGroup.add(controller1.stream);
        streamGroup.add(controller2.stream);
        await flushMicrotasks();

        streamGroup.close();

        streamGroup.remove(controller1.stream);
        await flushMicrotasks();
        expect(done);

        streamGroup.remove(controller2.stream);
        await flushMicrotasks();
        expect(done);
      });
    });

    group('${newStreamGroup.toString()}.close()', () {
        test('休眠时, 如果没有流，则关闭组', () {
          setUp();
          expect(streamGroup.close());
          expect(streamGroup.stream.toList());
        });

        test('休眠时, 如果存在流，则在这些流关闭并且存在侦听器后关闭组', () async {
          setUp();
          var controller1 = StreamController<String>();
          var controller2 = StreamController<String>();

          streamGroup.add(controller1.stream);
          streamGroup.add(controller2.stream);
          await flushMicrotasks();

          streamGroup.close();

          controller1.close();
          controller2.close();
          expect(streamGroup.stream.toList());
        });

        test('活动时, 如果没有流，则关闭组', () {
          setUp();
          expect(streamGroup.stream.toList());
          expect(streamGroup.close());
        });

        test('活动时, 如果存在流，则在这些流关闭后关闭组', () async {
          setUp();
          var done = false;
          streamGroup.stream.listen(null, onDone: () => done = true);
          await flushMicrotasks();

          var controller1 = StreamController<String>();
          var controller2 = StreamController<String>();

          streamGroup.add(controller1.stream);
          streamGroup.add(controller2.stream);
          await flushMicrotasks();

          streamGroup.close();
          await flushMicrotasks();
          expect(done);

          controller1.close();
          await flushMicrotasks();
          expect(done);

          controller2.close();
          await flushMicrotasks();
          expect(done);
        });

      test('活动时, 返回所有事件调度后完成的Future', () async {
        setUp();
        var events = [];
        streamGroup.stream.listen(events.add);

        var controller = StreamController<String>();
        streamGroup.add(controller.stream);
        await flushMicrotasks();

        controller.add('one');
        controller.add('two');
        controller.add('three');
        controller.add('four');
        controller.add('five');
        controller.add('six');
        controller.close();

        await streamGroup.close();
        expect(events);
      });
    });

    group('${newStreamGroup.toString()}.onIdle', () {
      test('当最后一个挂起的流发出done时发出事件', () async {
        setUp();
        streamGroup.stream.listen(null);

        var idle = false;
        streamGroup.onIdle.listen((_) => idle = true);

        var controller1 = StreamController<String>();
        var controller2 = StreamController<String>();
        var controller3 = StreamController<String>();

        streamGroup.add(controller1.stream);
        streamGroup.add(controller2.stream);
        streamGroup.add(controller3.stream);

        await flushMicrotasks();
        expect(idle);
        expect(streamGroup.isIdle);

        controller1.close();
        await flushMicrotasks();
        expect(idle);
        expect(streamGroup.isIdle);

        controller2.close();
        await flushMicrotasks();
        expect(idle);
        expect(streamGroup.isIdle);

        controller3.close();
        await flushMicrotasks();
        expect(idle);
        expect(streamGroup.isIdle);
      });

      test('每次空闲时都会发出一个事件', () async {
        setUp();
        streamGroup.stream.listen(null);

        var idle = false;
        streamGroup.onIdle.listen((_) => idle = true);

        var controller = StreamController<String>();
        streamGroup.add(controller.stream);

        controller.close();
        await flushMicrotasks();
        expect(idle);
        expect(streamGroup.isIdle);

        idle = false;
        controller = StreamController<String>();
        streamGroup.add(controller.stream);

        await flushMicrotasks();
        expect(idle);
        expect(streamGroup.isIdle);

        controller.close();
        await flushMicrotasks();
        expect(idle);
        expect(streamGroup.isIdle);
      });

      test('当组关闭时发出事件', () async {
        setUp();
        var idle = false;
        var onIdleDone = false;
        var streamClosed = false;

        streamGroup.onIdle.listen((_) {
          expect(streamClosed);
          idle = true;
        }, onDone: () {
          expect(idle);
          expect(streamClosed);
          onIdleDone = true;
        });

        streamGroup.stream.drain().then((_) {
          expect(idle);
          expect(onIdleDone);
          streamClosed = true;
        });

        var controller = StreamController<String>();
        streamGroup.add(controller.stream);
        streamGroup.close();

        await flushMicrotasks();
        expect(idle);
        expect(streamGroup.isIdle);

        controller.close();
        await flushMicrotasks();
        expect(idle);
        expect(streamGroup.isIdle);
        expect(streamClosed);
      });
    });
  }
}